1 支持的数据类型

数据类型 描述 值域
VARCHAR 可变长度字符串 最大容量为4mb
BOOLEAN 逻辑值 值:TRUE,FALSE,UNKNOWN
TINYINT 微整型 1字节整数 范围是-128到127
SMALLINT 短整型,2字节整数 范围为-32768至32767
INT 整型,4字节整数 范围是-2147483648到2147483647
BIGINT 长整型,8字节整数 范围是-9223372036854775808至9223372036854775807
FLOAT 4字节浮点数 6位数字精度
DECIMAL 小数类型 示例:123.45是DECIMAL(5,2)值。
DOUBLE 浮点,8字节浮点数 15位十进制精度
DATE 日期类型 示例:DATE'1969-07-20'
TIME 时间类型 示例:TIME '20:17:40'
TIMESTAMP 时间戳,日期和时间 示例:TIMESTAMP'1969-07-20 20:17:40'
VARBINARY 二进制数据 即 byte[] 数组

2 DDL语句

2.1 Source Table 定义数据源表

Kafka Source Table
目前只支持kafka
CREATE SOURCE TABLE orders (
    userid '/user/userid'  varchar,
    money bigint
)
WITH (
    type = 'kafka', -- 固定值
    topic = 'flink-topic-input', -- kafka topic
    encode = 'json', -- 支持kafka 消息格式为:json 和 csv 两种格式,json 支持嵌套,但需要在column定义时,指定path,如果上面实例中:/user/userid
    delimiter = '|', -- 如果encode为csv,设置分隔符,默认为逗号
    kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092',  --开发borker server地址
    kafka.group.id = 'flink-example-group' -- 消费组名称
)
TIMESTAMP BY proctime proctime;

2.2 Sink Table 定义数据输出表

Kafka Sink Table 案例

CREATE SINK TABLE stat_orders (
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    userid varchar,
    total_money bigint
)
WITH (
    type = 'kafka', -- 固定值
    topic = 'flink-topic-output', -- kafka topic
    encode = 'json', -- 支持kafka 消息格式为:json 和 csv 两种格式。
    delimiter = '|', -- 如果encode为csv,设置分隔符,默认为逗号
    kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092',  --开发borker server地址
)
TIMESTAMP BY proctime proctime;

InfluxDb Sink Table案例

CREATE SINK TABLE stat_orders (
    _measurement varchar, --  _measurement字段必须要存在
    _tag1 varchar, --tag字段可选,tag字段必须以_tag开头,若有多个tag,在_tag后面加数字
    _tag2 varchar,
    cpu varchar, -- field的名称
    io varchar -- field的名称
)
WITH (
    type = 'influxdb', -- 固定值
    influxdb.url = http://10.57.17.82:8086, --必选参数
    influxdb.username = test,  --必选参数
    influxdb.password = test,  --必选参数
    influxdb.database = testdb,  --必选参数
    influxdb.retention.policy = default,  --非必选参数

    influxdb.batch.actions = 100 --触发批次写入的条数, 非必须参数
    influxdb.flush.duration = 100 --触发批次写入的时长(单位: ms), 非必须参数
    influxdb.compress.gz = true --开启gz压缩,默认为false,非必须参数
)
TIMESTAMP BY proctime proctime;

Aerospike Sink Table

第一个字段作为主键
CREATE SINK TABLE stat_orders (
    userid varchar,
    total_money bigint
)
WITH (
    type = 'aerospike', -- 固定值
    aerospike.zookeep.servers = 'xxx' --使用公司封装的client,通过zk获取asp连接地址
    aerospike.code = 'xxx',
    aerospike.key.ttl = 3600000 --过期时间
)
TIMESTAMP BY proctime proctime;

3 DML语句

INSERT INTO语句

语法格式
 INSERT INTO tableName(,tableName)* queryStatement;

queryStatement 完成语法以及相关函数,请参考flik官方文档

示例:

INSERT INTO LargeOrders
SELECT * FROM Orders WHERE units > 1000;

INSERT INTO Orders(z, v)
SELECT c,d FROM OO;

说明

一个作业支持一个source table, 一个insert sql,支持多个sink table, 目的是一次计算,输出到多个sink table中
流计算不支持单独的select 查询,必须有CREATE VIEW 或这是在 INSERT INTO内才能操作。
INSERT INTO 支持UPDATA更新,例如向TIDB的表插入一个KEY值,如果这个KEY值存在就会更新;如果不存在就会插入一条新的KEY值。

4 SQL example

利用自定义函数对数据进行清洗加工

create function demoFunc as 'cn.tongdun.streamcompute.metrics.UdtfTest' USING streamcompute-udf-1.0-SNAPSHOT.jar;

            CREATE SOURCE TABLE orders (
                userid  varchar,
                money bigint
            )
            WITH (
                type = 'kafka',
                topic = 'sc-dev-topic-input1',
                encode = 'csv',
            --  delimiter = '|',
                kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092',
                kafka.group.id = 'streamcompute-dev-group1'
            )
            TIMESTAMP BY proctime proctime;


            CREATE SINK TABLE stat_orders_kafka (
                window_start TIMESTAMP,
                window_end TIMESTAMP,
                userid varchar,
                total_money bigint
            )
            WITH (
                type = 'kafka',
                topic = 'sc-dev-topic-output',
                encode = 'csv',
                kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092'
            );

            insert into stat_orders_kafka
            SELECT  
                TUMBLE_START(proctime, INTERVAL '10' SECOND),
                TUMBLE_END(proctime, INTERVAL '10' SECOND),
                 newuserid,
                SUM(money) as total_money  
            FROM (select userid, newuserid, money, proctime from orders LEFT JOIN LATERAL TABLE(demoFunc(userid)) as T(newuserid) ON TRUE) a
            GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND),newuserid;

results matching ""

    No results matching ""